home *** CD-ROM | disk | FTP | other *** search
/ Amiga Tools 3 / Amiga Tools 3.iso / grafik / raytracing / rayshade-4.0.6.3 / inetray / comm.c < prev    next >
C/C++ Source or Header  |  1993-08-19  |  15KB  |  585 lines

  1. /*======================================================================
  2.                     C O M M . C 
  3.                     doc: Thu Mar  5 12:32:02 1992
  4.                     dlm: Thu Aug 19 20:50:21 1993
  5.                     (c) 1992 ant@julia
  6.                     uE-Info: 68 70 T 0 11 72 2 2 8 ofnI
  7. ======================================================================*/
  8.  
  9. /*#define        VERBOSE        /* show retrys */
  10.  
  11. #include    <stdio.h>
  12. #include    <signal.h>
  13. #include    <netdb.h>    /* prefer system over rpc/netdb.h */
  14. #include    <errno.h>
  15. #include    <sys/time.h>
  16. #include    <sys/types.h>
  17. #include    <sys/socket.h>
  18. #include    <sys/param.h>
  19. #include    <rpc/rpc.h>
  20. #include    <arpa/inet.h>
  21. #include    <netinet/in.h>
  22. #include    "inetray.h"
  23. #include    "inetray.start.h"
  24. #include    "rcfile.h"
  25. #include    "config.h"
  26. #include    "common.h"
  27. #include    "utils.h"
  28. #include    "version.h"
  29.  
  30. hInfo        *hosts = NULL;        /* list of hosts */
  31. int        sfdmax = 0;        /* max fd for ressocks */
  32. int        nRunning = 0;        /* number of workers running */
  33.  
  34. static int    sock;            /* listening socket */
  35.  
  36. extern int    errno;            /* to restart syscalls */
  37. extern int    key;            /* session key */
  38.  
  39. void shutDown(sig)            /* shut down operation */
  40. int sig;
  41. {
  42.     int i,res;
  43.     void killAll(),flushAll(),waitAll(),terminateAll(),closeAll();
  44.  
  45.     for (i=1; i<NSIG; i++)            /* ignore all signals */
  46.         signal(i,SIG_IGN);
  47.     fprintf(stderr,"\nShutting down");
  48.     if (sig != 0)
  49.         fprintf(stderr," due to Signal #%d",sig);
  50.     fprintf(stderr,"... [kill"); 
  51.     killAll();                /* kill workers */
  52.     fprintf(stderr,", flush");
  53.     flushAll();                /* flush result sockets */
  54.     fprintf(stderr,", wait");
  55.     waitAll();                /* wait for workers */
  56.     fprintf(stderr,", terminate");
  57.     terminateAll();                /* terminate servers */
  58.     fprintf(stderr,", close");
  59.     closeAll();                /* close sockets */
  60.     fprintf(stderr,"]\n");
  61.     exit(0);
  62. }
  63.  
  64. void abort1(host)            /* this one is broken */
  65. hInfo *host;
  66. {
  67.     fprintf(stderr,
  68.         "\tWarning: Lost connection with %s(%d) (check remote error messages)\n",
  69.             host->name,host->wid);
  70.     host->broken = TRUE;        /* this host crashed */
  71.     close(host->sock);
  72.     if (--nRunning == 0) {        /* still servers running? */
  73.             fprintf(stderr,"We're alone now...");
  74.             shutDown(0);
  75.     }
  76. }
  77.  
  78. void IRbroadcast(fNum,inP,in)        /* bcast INIT or TERMINATE */
  79. u_long fNum; char *in; xdrproc_t inP;
  80. {
  81.     int     i,res,exists,uid,nSvcs;
  82.     char    *addr;
  83.  
  84.     exists = getHost(TRUE,&addr,&nSvcs,&uid);    /* hosts */
  85.     while (exists) {
  86.         if (fNum == INIT) {            /* INIT is special */
  87.             ((iPrm *)in)->uid = uid;
  88.             ((iPrm *)in)->nSvcs = nSvcs;
  89.             if (nSvcs > 1) nSvcs = 1;
  90.         }
  91.         for (i=0; i<nSvcs; i++) {
  92.             res = sendrpc(addr,INETRAY+i,IRV1,fNum,
  93.                     inP,in,xdr_void,NULL);
  94.             if ((res != 0) &&
  95.                 ((fNum != TERMINATE) || (res != RPC_PROGNOTREGISTERED))) {
  96.                 fprintf(stderr,
  97.                     "\nWarning: sendrpc(%s,%d): ",
  98.                     addr,fNum);
  99.                 clnt_perrno(res);
  100.                 addr[0] = '\0';        /* skip in future */
  101.             }
  102.         }
  103.         exists = getHost(FALSE,&addr,&nSvcs,&uid);
  104.     }
  105.  
  106.     exists = getNet(TRUE,&addr,&nSvcs,&uid);    /* nets */
  107.     while (exists) {
  108.         if (fNum == INIT) {
  109.             ((iPrm *)in)->uid = uid;
  110.             ((iPrm *)in)->nSvcs = nSvcs;
  111.             if (nSvcs > 1) nSvcs = 1;
  112.         }
  113.         for (i=0; i<nSvcs; i++) {
  114.             res = clnt_dirbcast(addr,INETRAY+i,IRV1,fNum,
  115.                         inP,in,xdr_void,NULL,NULL);
  116.             if (res != 0) {
  117.                 fprintf(stderr,
  118.                     "\nWarning: clnt_dirbcast(%s,%d): ",
  119.                     addr,fNum);
  120.                 clnt_perrno(res);
  121.                 addr[0] = '\0';        /* skip in future */
  122.             }
  123.         }
  124.         exists = getNet(FALSE,&addr,&nSvcs,&uid);
  125.     }
  126.  
  127.     getLocal(&addr,&nSvcs,&uid);            /* LAN */
  128.         if (fNum == INIT) {
  129.                 ((iPrm *)in)->uid = uid;
  130.         ((iPrm *)in)->nSvcs = nSvcs;
  131.         if (nSvcs > 1) nSvcs = 1;
  132.     }
  133.         for (i=0; i<nSvcs; i++) {
  134.             res = clnt_broadcast(INETRAY+i,IRV1,fNum,
  135.                                     inP,in,xdr_void,NULL,NULL);
  136.             if (res != 0) {
  137.                     fprintf(stderr,
  138.                             "\nWarning: clnt_broadcast(%d): ",fNum);
  139.                     clnt_perrno(res);
  140.         }
  141.     }
  142. }
  143.  
  144. void IRSbroadcast()                    /* bcast START */
  145. {
  146.     int     res,exists,nSvcs,uid;
  147.     char    *addr;
  148.  
  149.     exists = getHost(TRUE,&addr,&nSvcs,&uid);    /* hosts */
  150.     while (exists) {
  151.         if (nSvcs > 0) {
  152.                     res = sendrpc(addr,STARTER,IRSV1,START,
  153.                                   xdr_void,NULL,xdr_void,NULL);
  154.                     if ((res != 0) && (res != RPC_PROGNOTREGISTERED)) {
  155.                             fprintf(stderr,
  156.                                     "\nWarning: sendrpc(%s,START): ",addr);
  157.                             clnt_perrno(res);
  158.                 addr[0] = '\0';        /* skip in future */
  159.             }
  160.         }
  161.         exists = getHost(FALSE,&addr,&nSvcs,&uid);
  162.     }
  163.  
  164.     exists = getNet(TRUE,&addr,&nSvcs,&uid);    /* nets */
  165.     while (exists) {
  166.         if (nSvcs > 0) {
  167.                     res = clnt_dirbcast(addr,STARTER,IRSV1,START,
  168.                                         xdr_void,NULL,xdr_void,NULL,NULL);
  169.                     if (res != 0) {
  170.                             fprintf(stderr,
  171.                                     "\nWarning: clnt_dirbcast(%s,START): ",addr);
  172.                             clnt_perrno(res);
  173.             }
  174.         }
  175.         exists = getNet(FALSE,&addr,&nSvcs,&uid);
  176.     }
  177.  
  178.     getLocal(&addr,&nSvcs,&uid);            /* LAN */
  179.     if (nSvcs > 0) {
  180.             res = clnt_broadcast(STARTER,IRSV1,START,
  181.                                  xdr_void,NULL,xdr_void,NULL,NULL);
  182.             if (res != 0) {
  183.                     fprintf(stderr,"\nWarning: clnt_broadcast(START): ");
  184.                     clnt_perrno(res);
  185.         }
  186.     }
  187. }
  188.  
  189. static readit(fdp,buf,nbyte)            /* xdr aux routine */
  190. char *fdp,*buf; int nbyte;
  191. {
  192.     int nread,fd;
  193.  
  194.     fd = *(int *)fdp;
  195. reStartRead:
  196.     nread = read(fd,buf,nbyte);
  197.     if (nread < 0) {
  198.         if (errno == EINTR) {
  199.             goto reStartRead;
  200.         }
  201.         perror("read");
  202.     }
  203.     if (nread == 0) nread = -1;
  204.     return nread;
  205. }
  206.  
  207. static void getAnswers()                /* wait for answers */
  208. {
  209.     int    aLen,res;
  210.     hInfo    *new,*old,*cur;
  211.     fd_set    sockSet;
  212.     char    buf[64],*cp;
  213.     struct hostent         *host;
  214.     struct servent         *s;
  215.     struct sockaddr_in    addr;
  216.     struct timeval        tout,noDefault;
  217.  
  218.     tout.tv_sec = rTimeout;                /* timeout */
  219.     tout.tv_usec = 0;
  220.     noDefault.tv_sec = noDefault.tv_usec = -1;    /* use supplied val */
  221.     do {
  222.             do {
  223.         restartSelect:
  224.                 FD_ZERO(&sockSet);        /* select() on socket */
  225.                 FD_SET(sock,&sockSet);
  226.                     res = select(sock+1,&sockSet,NULL,NULL,&tout);
  227.                     if (res < 0) {
  228.                        if (errno == EINTR) goto restartSelect;
  229.                             perror("select");
  230.                             exit(1);
  231.             }
  232.         } while ((res == 0) && (nRunning < minWorkers));
  233.         if (res == 0) break;            /* done */
  234.         
  235.         new = (hInfo *)malloc(sizeof(hInfo));     /* alloc node */
  236.         if (new == NULL) {
  237.             fprintf(stderr,"malloc() failed\n");
  238.             exit(1);
  239.         }
  240.  
  241.         new->reTrys = 0;
  242.                aLen = sizeof(addr);            /* accept result */
  243.         new->sock = accept(sock,&addr,&aLen);
  244.         if (new->sock < 0) {
  245.             perror("accept");
  246.             exit(1);
  247.         }
  248.         sprintf(buf,"<%s>",inet_ntoa(addr.sin_addr));
  249.         fprintf(stderr,"\n  %-18.17s",buf);
  250.         if (new->sock > sfdmax) sfdmax = new->sock;
  251.  
  252.         xdrrec_create(&(new->xdrs),0,0,        /* make xdr_stuff */
  253.                 (char *)&(new->sock),readit,NULL);
  254.         new->xdrs.x_op = XDR_DECODE;
  255.             if (!xdrrec_skiprecord(&(new->xdrs))) {    /* read next record*/
  256.                     fprintf(stderr,"xdrrec_skiprecord() failed\n");
  257.                     exit(1);
  258.         }
  259.                 if (!xdr_int(&(new->xdrs),&(new->wid))) {/* read info */
  260.             fprintf(stderr,"xdr_int() failed\n");
  261.             exit(1);
  262.         }
  263.                 if (!xdr_int(&(new->xdrs),&(new->pid))) {
  264.             fprintf(stderr,"xdr_int() failed\n");
  265.             exit(1);
  266.         }
  267.         cp = new->rUser;
  268.                 if (!xdr_string(&(new->xdrs),&cp,32)) {
  269.             fprintf(stderr,"xdr_string() failed\n");
  270.             exit(1);
  271.         }
  272.         cp = new->rwd;
  273.                 if (!xdr_string(&(new->xdrs),&cp,MAXPATHLEN)) {
  274.             fprintf(stderr,"xdr_string() failed\n");
  275.             exit(1);
  276.         }
  277.         cp = new->rVersion;
  278.                 if (!xdr_string(&(new->xdrs),&cp,8)) {
  279.             fprintf(stderr,"xdr_string() failed\n");
  280.             exit(1);
  281.         }
  282.         
  283.         host = gethostbyaddr(&addr.sin_addr,    /* create client */
  284.             sizeof(addr.sin_addr),AF_INET);
  285.         if (host == NULL) {
  286.             perror("gethostbyaddr");
  287.             exit(1);
  288.         }
  289.         strncpy(new->name,host->h_name,MAXHOSTNAMELEN);
  290.         memcpy((char *)&new->addr,
  291.             (char *)&addr.sin_addr,
  292.             sizeof(addr.sin_addr));
  293.         new->clnt = clnt_create(new->name,
  294.                 INETRAY+new->wid,IRV1,"udp");
  295.         new->done = 0;
  296.         if (new->clnt == NULL) {
  297.             fprintf(stderr,"Warning: clnt_create(%s,%d)",new->name,new->wid);
  298.             clnt_pcreateerror("");
  299.             new->done = -1;
  300.         } else if (!clnt_control(new->clnt,CLSET_TIMEOUT,&noDefault)) {
  301.             fprintf(stderr,"clnt_control() failed\n");
  302.             exit(1);
  303.         }
  304.  
  305.         for (old = NULL, cur = hosts;        /* sort into list */
  306.              (cur != NULL) &&
  307.              (cur->wid > new->wid);
  308.              old = cur, cur = cur->next) ;
  309.         new->next = cur;            /* link it in list */
  310.         if (old == NULL) hosts = new;
  311.         else         old->next = new;
  312.  
  313.         sprintf(buf,"[%s]",new->rVersion);    /* version # */
  314.         fprintf(stderr,"%-8.7s",buf);
  315.  
  316.         fprintf(stderr,"%-14.13s",new->name);    /* name */
  317.         fprintf(stderr,"%5d ",new->pid);    /* pid */
  318.  
  319.         if (!doThis(new->name) ||        /* drop it */
  320.                 (nRunning == maxWorkers)) {        
  321.             new->done = -1;            /* sentinel */
  322.             fprintf(stderr,"IGNORED");
  323.         } else if (new->rVersion[0] != VERSION[0]) {
  324.             new->done = -1;            /* sentinel */
  325.             fprintf(stderr,"VERSION MISMATCH");
  326.         } else if (new->done == 0) {
  327.             new->broken = FALSE;
  328.             nRunning++;            /* found one */
  329.             fprintf(stderr,"%-10.9s%s",new->rUser,new->rwd);
  330.         }
  331.     } FOREVER;
  332.  
  333.     for (old = NULL, cur = hosts;            /* drop from list */
  334.          cur != NULL;
  335.          cur = cur->next) {
  336.         if (cur->done == 0) {
  337.             old = cur;
  338.             continue;
  339.         }
  340.         if (cur->clnt != NULL) {        /* create failed */
  341.             res = clnt_call(cur->clnt,TERMINATE,
  342.                         xdr_int,&key,
  343.                         xdr_void,NULL,
  344.                         now); 
  345.             if (res != 5) {
  346.                 fprintf(stderr,"clnt_call(%s)",cur->name);
  347.                 clnt_perrno(res);
  348.                 exit(1);
  349.             }
  350.         }
  351.         if (old == NULL) hosts = cur->next;    /* drop from list */
  352.         else          old->next = cur->next;
  353.     }
  354. }
  355.  
  356. void registerSvc(hName,cLine,cwd,sIn)    /* register servers */
  357. char *hName,*cLine,*cwd,sIn;        /* params for INIT */
  358. {
  359.     int    i;
  360.     iPrm    param;
  361.     char    bindErr = FALSE;
  362.     struct sockaddr_in addr;
  363.  
  364.     param.rName = hName;        /* compose parameters */
  365.     param.cmdLine = cLine;
  366.     stripHead(STRIPHEAD,cwd);
  367.     stripHome(geteuid(),cwd);
  368.     param.cwd = cwd;
  369.     param.key = key;
  370.     param.rPort = portNumber;
  371.     param.sendIn = sIn;
  372.     
  373.     /* listen on socket */
  374.     if ((sock = socket(PF_INET,SOCK_STREAM,0)) < 0) {
  375.         perror("socket");
  376.         exit(1);
  377.     }
  378.     addr.sin_family = AF_INET;
  379.     addr.sin_port = htons(portNumber);
  380.     addr.sin_addr.s_addr = INADDR_ANY;
  381.     restartBind:
  382.     if (bind(sock,&addr,sizeof(addr)) < 0) {
  383.         if (errno == EADDRINUSE) {
  384.             if (!bindErr) {
  385.                 fprintf(stderr,"\nAddress in use! Retrying: .");
  386.                 bindErr = TRUE;
  387.             } else  fprintf(stderr,".");
  388.             sleep(5);
  389.             goto restartBind;
  390.         }
  391.         perror("bind");
  392.         exit(1);
  393.     }
  394.     if (listen(sock,5) < 0) {
  395.         perror("listen");
  396.         exit(1);
  397.     }
  398.  
  399.     fprintf(stderr,"start"); 
  400.     IRSbroadcast();                /* start servers */
  401.     fprintf(stderr,", delay"); 
  402.     sleep(rTimeout);            /* allow startup */
  403.     fprintf(stderr,", init"); 
  404.     IRbroadcast(INIT,xdr_iPrm,¶m);    /* start all workers */
  405.     fprintf(stderr,"]"); 
  406.     getAnswers();                /* wait for replys */
  407.     close(sock);
  408. }
  409.  
  410. void distribute(bp)            /* distributed buffered stdin */
  411. int bp;
  412. {
  413.     hInfo    *host;
  414.     char    buf[STRLEN];
  415.     int    nRead;
  416.  
  417.     do {
  418.         nRead = read(bp,buf,STRLEN);
  419.         if (nRead < 0) {
  420.             perror("read");
  421.             exit(1);
  422.         }
  423.         if (nRead == 0) {
  424.             buf[0] = '\0';
  425.             nRead = 1;
  426.         }
  427.         for (host=hosts;host!=NULL;host=host->next) {
  428.             if (write(host->sock,buf,nRead) != nRead) {
  429.                 perror("write");
  430.                 exit(1);
  431.             }
  432.         }
  433.     } while (buf[nRead-1] != '\0');
  434. }
  435.  
  436. void closeAll()                /* close all sockets */
  437. {
  438.     hInfo     *host;
  439.     
  440.     for (host = hosts; host != NULL; host = host->next) {
  441.         if (host->broken) continue;
  442.         xdr_destroy(&(host->xdrs));
  443.         clnt_destroy(host->clnt);
  444.         close(host->sock);
  445.     }
  446. }
  447.  
  448. void flushAll()                /* flush all sockets */
  449. {
  450.     hInfo     *host;
  451.     fd_set     sockSet;
  452.     char    buf[1024];
  453.     int    res;
  454.     struct timeval tout;
  455.  
  456.     tout.tv_sec = FLUSHTIMEOUT;
  457.     tout.tv_usec = 0;
  458.     do {
  459.     restartSelect:
  460.             FD_ZERO(&sockSet);
  461.         for (host = hosts; host != NULL; host = host->next)
  462.             if (!host->broken)
  463.                 FD_SET(host->sock,&sockSet);
  464.             res = select(sfdmax+1,&sockSet,NULL,NULL,&tout);
  465.                if (res < 0) {
  466.                    if (errno == EINTR) goto restartSelect;
  467.                     perror("select");
  468.             exit(1);
  469.         }
  470.         if (res == 0) return;    /* done */
  471.         for (host = hosts; host != NULL; host = host->next) {
  472.             if (host->broken) continue;
  473.             if (FD_ISSET(host->sock,&sockSet)) {
  474.                 if (read(host->sock,buf,1024) < 0) {
  475.                     perror("read");
  476.                     exit(1);
  477.                 }
  478.             }
  479.         }
  480.         } FOREVER;
  481. }
  482.  
  483. void killAll()                /* kill all servers */
  484. {
  485.     hInfo     *host;
  486.     int    res,nTrys,rpcerr;
  487.  
  488.     for (host = hosts; host != NULL; host = host->next) {
  489.         if (host->broken) continue;
  490.         for (nTrys=CALLRETRY; nTrys > 0; nTrys--) {
  491.             rpcerr = clnt_call(host->clnt,KILL,
  492.                        xdr_int,&key,
  493.                        xdr_int,&res,
  494.                        callTOut);
  495.             if (rpcerr == RPC_SUCCESS) break;
  496.             if (rpcerr == RPC_TIMEDOUT) {
  497. #ifdef VERBOSE
  498.                 fprintf(stderr,
  499.                     "KILL retry to %s(%d)\n",
  500.                     host->name,host->wid);
  501. #endif
  502.                 host->reTrys++;
  503.                 if (nTrys > 1) continue;
  504.             }
  505.             fprintf(stderr,"kill(): ");
  506.             abort1(host);
  507.         }
  508.     }
  509. }
  510.  
  511. void waitAll()                /* wait on all clients */
  512. {
  513.     hInfo     *host;
  514.     int    res,nTrys,rpcerr;
  515.  
  516.     for (host = hosts; host != NULL; host = host->next) {
  517.         if (host->broken) continue;
  518.         for (nTrys=CALLRETRY; nTrys > 0; nTrys--) {
  519.             rpcerr = clnt_call(host->clnt,WAIT,
  520.                        xdr_int,&key,
  521.                        xdr_int,&res,
  522.                        callTOut);
  523.             if (rpcerr == RPC_SUCCESS) break;
  524.             if (rpcerr == RPC_TIMEDOUT) {
  525. #ifdef VERBOSE
  526.                 fprintf(stderr,
  527.                     "WAIT retry to %s(%d)\n",
  528.                     host->name,host->wid);
  529. #endif
  530.                 host->reTrys++;
  531.                 if (nTrys > 1) continue;
  532.             }
  533.             fprintf(stderr,"wait(): ");
  534.             abort1(host);
  535.         }
  536.     }
  537. }
  538.  
  539. void terminateAll()            /* terminate all svcs */
  540. {
  541.     hInfo     *host;
  542.     fd_set     sockSet;
  543.     int    res,n,time=0;
  544.  
  545.     IRbroadcast(TERMINATE,xdr_int,&key);    /* terminate all */
  546.     sleep(1); time++;
  547.  
  548.     while (time < TERMTIMEOUT) {
  549.     restartSelect:
  550.             FD_ZERO(&sockSet);
  551.             for (n=0,host=hosts; host!=NULL; host=host->next) {
  552.                 if (host->broken) continue;
  553.             FD_SET(host->sock,&sockSet);
  554.             n++;
  555.         }
  556.             res = select(sfdmax+1,&sockSet,NULL,NULL,&now);
  557.                if (res < 0) {
  558.                    if (errno == EINTR) goto restartSelect;
  559.                     perror("select");
  560.             exit(1);
  561.         }
  562.         if (n == res) return;        /* done */
  563.         for (host = hosts; host != NULL; host = host->next) {
  564.             host->broken = FD_ISSET(host->sock,&sockSet);
  565.             if (!host->broken) {
  566. #ifdef VERBOSE
  567.                 fprintf(stderr,
  568.                     "TERM retry to %s(%d)\n",
  569.                     host->name,host->wid);
  570. #endif
  571.                 host->reTrys++;
  572.                 (void)clnt_call(host->clnt,TERMINATE,
  573.                         xdr_int,&key,
  574.                         xdr_void,NULL,now);
  575.             }
  576.         }
  577.         sleep(1); time++;
  578.         }
  579.         fprintf(stderr,"\nWarning: The following servers may not have terminated:\n");
  580.         for (host = hosts; host != NULL; host = host->next) {
  581.                 if (!FD_ISSET(host->sock,&sockSet)) 
  582.                         fprintf(stderr,"\t%s[%d]\n",host->name,host->pid);
  583.         }
  584. }
  585.